###
### generate data for RDD
### one line per individual with time-to-ad, previous, next 24h news consumption
### 


library(dplyr)
library(purrr)
library(tidyr)
library(forcats)
library(stringr)
library(readr)
library(tibble)
library(data.table)
library(parallel)
library(lubridate)
library(hms)
library(magrittr)


setDTthreads(1)
### SET WORKING DIRECTORY HERE 
path_to_archive <- "replication/"
setwd(path_to_archive)

# for timezone adjustments
load("data/dma_timezone.RData")
dma_timezone <- dma_timezone %>% 
	select(dma_code, timezone) %>%
	mutate(timezone = recode(timezone, ETZ = "America/New_York", CTZ = "America/Chicago", PTZ="America/Los_Angeles")) %>%
	filter(!is.na(dma_code)) %>%
	as.data.table

# read ads data
ads <- readRDS("data/final_ads.rds")

dim(ads)
# [1] 286863     65

table(ads$ad_type)
# cand_house      cand_pres    cand_senate cand_statewide        outside 
#      52051          76172          65776          22935          69929 


get_time_in_window <- function(view_beg_time, view_end_time, ad_time, window=24, direction = c("before", "after")) {
	
	if (direction == "before") {
		window_start <- ad_time - dhours(window)
		window_end <- ad_time	
	} else {
		window_start <- ad_time 
		window_end <- ad_time + dhours(window)
	}
	
	(pmin(view_end_time, window_end) - pmax(view_beg_time, window_start)) %>% 
		time_length(unit="second") %>% 
		pmax(0) %>%
		sum(na.rm=T)

}


compute_time_watched <- function(d) {

	cat(as.character(d), "\n")
	cat("\tLoading T/C group definition...\n")
	t_c_by_ad <- readRDS(paste0("data/dd/tc_groups_", as.character(d), ".rds")) %>%
		mutate(n_t = map_dbl(t_c, ~ .[group=="T", .N]),
			   n_c1 = map_dbl(t_c, ~ .[group=="C1", .N]),
			   n_c2 = map_dbl(t_c, ~ .[group=="C2", .N]),
			   n_c12 = map_dbl(t_c, ~ .[group=="C1+C2", .N])) %>%
		filter(n_t > 0, (n_c1 + n_c2 + n_c12 > 0))


	cat("\tReading news viewing intervals...\n")	
	news_view_yday <- readRDS(paste0("data/news_intervals/news_intervals_", as.character(d-1), ".rds"))
	news_view_tday <- readRDS(paste0("data/news_intervals/news_intervals_", as.character(d), ".rds"))
	news_view_tmw <- readRDS(paste0("data/news_intervals/news_intervals_", as.character(d+1), ".rds"))

	news_view <- rbindlist(list(news_view_yday,news_view_tday,news_view_tmw)) %>% 
		.[,.(device_id, event_time_utc, event_time_utc_end)] %>%
		setkey(device_id)

	cat("\tMain ads loop...")

	dev_stack <- t_c_by_ad %>% 
		unnest(cols=c(t_c)) %>% 
		rename(ad_time_utc=event_time_utc) %>% 
		as.data.table %>%
		.[,.(dma_code, channel, affiliate, program, ad_time_utc, tunein_delta, tuneout_delta, group, device_id)] %>%
		setkey(device_id)

	expand_view <- news_view[dev_stack, allow.cartesian=T]

	setkey(expand_view, dma_code, channel, affiliate, program, ad_time_utc, group, tunein_delta, tuneout_delta, device_id)
	
	expand_view[,
		.(news_pre = get_time_in_window(event_time_utc, event_time_utc_end, ad_time_utc, direction = "before"),
		  news_post = get_time_in_window(event_time_utc, event_time_utc_end, ad_time_utc, direction = "after")),
		by = .(dma_code, channel, affiliate, program, ad_time_utc, group, tunein_delta, tuneout_delta, device_id)]

}

dates <- seq(mdy("09/01/2012"), mdy("11/06/2012"), by = "days")

mclapply(dates, FUN=compute_time_watched, mc.cores=20) %>% 
	rbindlist %>%
	saveRDS("data/all_rd_data.rds")